/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1

#include <thrift/Thrift.h>
#include <thrift/server/TServer.h>
#include <thrift/transport/PlatformSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/ThreadManager.h>
#include <climits>
#include <thrift/concurrency/Thread.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/Mutex.h>
#include <stack>
#include <vector>
#include <string>
#include <cstdlib>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <event.h>



namespace apache
{
namespace thrift
{
namespace server
{

using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
using apache::thrift::concurrency::PlatformThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
using apache::thrift::concurrency::Guard;

#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
#else
// assume latest version 1 series
#define LIBEVENT_VERSION_MAJOR 1
#define LIBEVENT_VERSION_MINOR 14
#define LIBEVENT_VERSION_REL 13
#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
#endif

#if LIBEVENT_VERSION_NUMBER < 0x02000000
typedef THRIFT_SOCKET evutil_socket_t;
#endif

#ifndef SOCKOPT_CAST_T
#   ifndef _WIN32
#       define SOCKOPT_CAST_T void
#   else
#       define SOCKOPT_CAST_T char
#   endif // _WIN32
#endif

template<class T>
inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v)
{
    return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
}

template<class T>
inline SOCKOPT_CAST_T* cast_sockopt(T* v)
{
    return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}

/**
 * This is a non-blocking server in C++ for high performance that
 * operates a set of IO threads (by default only one). It assumes that
 * all incoming requests are framed with a 4 byte length indicator and
 * writes out responses using the same framing.
 *
 * It does not use the TServerTransport framework, but rather has socket
 * operations hardcoded for use with select.
 *
 */


/// Overload condition actions.
enum TOverloadAction
{
    T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
    T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
    T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
};

class TNonblockingIOThread;

class TNonblockingServer : public TServer
{
private:
    class TConnection;

    friend class TNonblockingIOThread;
private:
    /// Listen backlog
    static const int LISTEN_BACKLOG = 1024;

    /// Default limit on size of idle connection pool
    static const size_t CONNECTION_STACK_LIMIT = 1024;

    /// Default limit on frame size
    static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;

    /// Default limit on total number of connected sockets
    static const int MAX_CONNECTIONS = INT_MAX;

    /// Default limit on connections in handler/task processing
    static const int MAX_ACTIVE_PROCESSORS = INT_MAX;

    /// Default size of write buffer
    static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;

    /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
    static const int IDLE_READ_BUFFER_LIMIT = 1024;

    /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
    static const int IDLE_WRITE_BUFFER_LIMIT = 1024;

    /// # of calls before resizing oversized buffers (0 = check only on close)
    static const int RESIZE_BUFFER_EVERY_N = 512;

    /// # of IO threads to use by default
    static const int DEFAULT_IO_THREADS = 1;

    /// # of IO threads this server will use
    size_t numIOThreads_;

    /// Whether to set high scheduling priority for IO threads
    bool useHighPriorityIOThreads_;

    /// Server socket file descriptor
    THRIFT_SOCKET serverSocket_;

    /// Port server runs on
    int port_;

    /// The optional user-provided event-base (for single-thread servers)
    event_base* userEventBase_;

    /// For processing via thread pool, may be NULL
    boost::shared_ptr<ThreadManager> threadManager_;

    /// Is thread pool processing?
    bool threadPoolProcessing_;

    // Factory to create the IO threads
    boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;

    // Vector of IOThread objects that will handle our IO
    std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;

    // Index of next IO Thread to be used (for round-robin)
    uint32_t nextIOThread_;

    // Synchronizes access to connection stack and similar data
    Mutex connMutex_;

    /// Number of TConnection object we've created
    size_t numTConnections_;

    /// Number of Connections processing or waiting to process
    size_t numActiveProcessors_;

    /// Limit for how many TConnection objects to cache
    size_t connectionStackLimit_;

    /// Limit for number of connections processing or waiting to process
    size_t maxActiveProcessors_;

    /// Limit for number of open connections
    size_t maxConnections_;

    /// Limit for frame size
    size_t maxFrameSize_;

    /// Time in milliseconds before an unperformed task expires (0 == infinite).
    int64_t taskExpireTime_;

    /**
     * Hysteresis for overload state.  This is the fraction of the overload
     * value that needs to be reached before the overload state is cleared;
     * must be <= 1.0.
     */
    double overloadHysteresis_;

    /// Action to take when we're overloaded.
    TOverloadAction overloadAction_;

    /**
     * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
     * and found to be exceeded, reinitialized) to this size.
     */
    size_t writeBufferDefaultSize_;

    /**
     * Max read buffer size for an idle TConnection.  When we place an idle
     * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
     * we will free the buffer (such that it will be reinitialized by the next
     * received frame) if it has exceeded this limit.  0 disables this check.
     */
    size_t idleReadBufferLimit_;

    /**
     * Max write buffer size for an idle connection.  When we place an idle
     * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
     * we insure that its write buffer is <= to this size; otherwise we
     * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
     * idle connections don't hog memory. 0 disables this check.
     */
    size_t idleWriteBufferLimit_;

    /**
     * Every N calls we check the buffer size limits on a connected TConnection.
     * 0 disables (i.e. the checks are only done when a connection closes).
     */
    int32_t resizeBufferEveryN_;

    /// Set if we are currently in an overloaded state.
    bool overloaded_;

    /// Count of connections dropped since overload started
    uint32_t nConnectionsDropped_;

    /// Count of connections dropped on overload since server started
    uint64_t nTotalConnectionsDropped_;

    /**
     * This is a stack of all the objects that have been created but that
     * are NOT currently in use. When we close a connection, we place it on this
     * stack so that the object can be reused later, rather than freeing the
     * memory and reallocating a new object later.
     */
    std::stack<TConnection*> connectionStack_;

    /**
     * This container holds pointers to all active connections. This container
     * allows the server to clean up unlcosed connection objects at destruction,
     * which in turn allows their transports, protocols, processors and handlers
     * to deallocate and clean up correctly.
     */
    std::vector<TConnection*> activeConnections_;

    /**
     * Called when server socket had something happen.  We accept all waiting
     * client connections on listen socket fd and assign TConnection objects
     * to handle those requests.
     *
     * @param fd the listen socket.
     * @param which the event flag that triggered the handler.
     */
    void handleEvent(THRIFT_SOCKET fd, short which);

    void init(int port)
    {
        serverSocket_ = THRIFT_INVALID_SOCKET;
        numIOThreads_ = DEFAULT_IO_THREADS;
        nextIOThread_ = 0;
        useHighPriorityIOThreads_ = false;
        port_ = port;
        userEventBase_ = NULL;
        threadPoolProcessing_ = false;
        numTConnections_ = 0;
        numActiveProcessors_ = 0;
        connectionStackLimit_ = CONNECTION_STACK_LIMIT;
        maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
        maxConnections_ = MAX_CONNECTIONS;
        maxFrameSize_ = MAX_FRAME_SIZE;
        taskExpireTime_ = 0;
        overloadHysteresis_ = 0.8;
        overloadAction_ = T_OVERLOAD_NO_ACTION;
        writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
        idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
        idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
        resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
        overloaded_ = false;
        nConnectionsDropped_ = 0;
        nTotalConnectionsDropped_ = 0;
    }

public:
    template<typename ProcessorFactory>
    TNonblockingServer(
        const boost::shared_ptr<ProcessorFactory>& processorFactory,
        int port,
        THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
        TServer(processorFactory)
    {
        init(port);
    }

    template<typename Processor>
    TNonblockingServer(const boost::shared_ptr<Processor>& processor,
                       int port,
                       THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
        TServer(processor)
    {
        init(port);
    }

    template<typename ProcessorFactory>
    TNonblockingServer(
        const boost::shared_ptr<ProcessorFactory>& processorFactory,
        const boost::shared_ptr<TProtocolFactory>& protocolFactory,
        int port,
        const boost::shared_ptr<ThreadManager>& threadManager =
            boost::shared_ptr<ThreadManager>(),
        THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
        TServer(processorFactory)
    {

        init(port);

        setInputProtocolFactory(protocolFactory);
        setOutputProtocolFactory(protocolFactory);
        setThreadManager(threadManager);
    }

    template<typename Processor>
    TNonblockingServer(
        const boost::shared_ptr<Processor>& processor,
        const boost::shared_ptr<TProtocolFactory>& protocolFactory,
        int port,
        const boost::shared_ptr<ThreadManager>& threadManager =
            boost::shared_ptr<ThreadManager>(),
        THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
        TServer(processor)
    {

        init(port);

        setInputProtocolFactory(protocolFactory);
        setOutputProtocolFactory(protocolFactory);
        setThreadManager(threadManager);
    }

    template<typename ProcessorFactory>
    TNonblockingServer(
        const boost::shared_ptr<ProcessorFactory>& processorFactory,
        const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
        const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
        const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
        const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
        int port,
        const boost::shared_ptr<ThreadManager>& threadManager =
            boost::shared_ptr<ThreadManager>(),
        THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
        TServer(processorFactory)
    {

        init(port);

        setInputTransportFactory(inputTransportFactory);
        setOutputTransportFactory(outputTransportFactory);
        setInputProtocolFactory(inputProtocolFactory);
        setOutputProtocolFactory(outputProtocolFactory);
        setThreadManager(threadManager);
    }

    template<typename Processor>
    TNonblockingServer(
        const boost::shared_ptr<Processor>& processor,
        const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
        const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
        const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
        const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
        int port,
        const boost::shared_ptr<ThreadManager>& threadManager =
            boost::shared_ptr<ThreadManager>(),
        THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
        TServer(processor)
    {

        init(port);

        setInputTransportFactory(inputTransportFactory);
        setOutputTransportFactory(outputTransportFactory);
        setInputProtocolFactory(inputProtocolFactory);
        setOutputProtocolFactory(outputProtocolFactory);
        setThreadManager(threadManager);
    }

    ~TNonblockingServer();

    void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);

    boost::shared_ptr<ThreadManager> getThreadManager()
    {
        return threadManager_;
    }

    /**
     * Sets the number of IO threads used by this server. Can only be used before
     * the call to serve() and has no effect afterwards.  We always use a
     * PosixThreadFactory for the IO worker threads, because they must joinable
     * for clean shutdown.
     */
    void setNumIOThreads(size_t numThreads)
    {
        numIOThreads_ = numThreads;
    }

    /** Return whether the IO threads will get high scheduling priority */
    bool useHighPriorityIOThreads() const
    {
        return useHighPriorityIOThreads_;
    }

    /** Set whether the IO threads will get high scheduling priority. */
    void setUseHighPriorityIOThreads(bool val)
    {
        useHighPriorityIOThreads_ = val;
    }

    /** Return the number of IO threads used by this server. */
    size_t getNumIOThreads() const
    {
        return numIOThreads_;
    }

    /**
     * Get the maximum number of unused TConnection we will hold in reserve.
     *
     * @return the current limit on TConnection pool size.
     */
    size_t getConnectionStackLimit() const
    {
        return connectionStackLimit_;
    }

    /**
     * Set the maximum number of unused TConnection we will hold in reserve.
     *
     * @param sz the new limit for TConnection pool size.
     */
    void setConnectionStackLimit(size_t sz)
    {
        connectionStackLimit_ = sz;
    }

    bool isThreadPoolProcessing() const
    {
        return threadPoolProcessing_;
    }

    void addTask(boost::shared_ptr<Runnable> task)
    {
        threadManager_->add(task, 0LL, taskExpireTime_);
    }

    /**
     * Return the count of sockets currently connected to.
     *
     * @return count of connected sockets.
     */
    size_t getNumConnections() const
    {
        return numTConnections_;
    }

    /**
     * Return the count of sockets currently connected to.
     *
     * @return count of connected sockets.
     */
    size_t getNumActiveConnections() const
    {
        return getNumConnections() - getNumIdleConnections();
    }

    /**
     * Return the count of connection objects allocated but not in use.
     *
     * @return count of idle connection objects.
     */
    size_t getNumIdleConnections() const
    {
        return connectionStack_.size();
    }

    /**
     * Return count of number of connections which are currently processing.
     * This is defined as a connection where all data has been received and
     * either assigned a task (when threading) or passed to a handler (when
     * not threading), and where the handler has not yet returned.
     *
     * @return # of connections currently processing.
     */
    size_t getNumActiveProcessors() const
    {
        return numActiveProcessors_;
    }

    /// Increment the count of connections currently processing.
    void incrementActiveProcessors()
    {
        Guard g(connMutex_);
        ++numActiveProcessors_;
    }

    /// Decrement the count of connections currently processing.
    void decrementActiveProcessors()
    {
        Guard g(connMutex_);

        if (numActiveProcessors_ > 0)
        {
            --numActiveProcessors_;
        }
    }

    /**
     * Get the maximum # of connections allowed before overload.
     *
     * @return current setting.
     */
    size_t getMaxConnections() const
    {
        return maxConnections_;
    }

    /**
     * Set the maximum # of connections allowed before overload.
     *
     * @param maxConnections new setting for maximum # of connections.
     */
    void setMaxConnections(size_t maxConnections)
    {
        maxConnections_ = maxConnections;
    }

    /**
     * Get the maximum # of connections waiting in handler/task before overload.
     *
     * @return current setting.
     */
    size_t getMaxActiveProcessors() const
    {
        return maxActiveProcessors_;
    }

    /**
     * Set the maximum # of connections waiting in handler/task before overload.
     *
     * @param maxActiveProcessors new setting for maximum # of active processes.
     */
    void setMaxActiveProcessors(size_t maxActiveProcessors)
    {
        maxActiveProcessors_ = maxActiveProcessors;
    }

    /**
     * Get the maximum allowed frame size.
     *
     * If a client tries to send a message larger than this limit,
     * its connection will be closed.
     *
     * @return Maxium frame size, in bytes.
     */
    size_t getMaxFrameSize() const
    {
        return maxFrameSize_;
    }

    /**
     * Set the maximum allowed frame size.
     *
     * @param maxFrameSize The new maximum frame size.
     */
    void setMaxFrameSize(size_t maxFrameSize)
    {
        maxFrameSize_ = maxFrameSize;
    }

    /**
     * Get fraction of maximum limits before an overload condition is cleared.
     *
     * @return hysteresis fraction
     */
    double getOverloadHysteresis() const
    {
        return overloadHysteresis_;
    }

    /**
     * Set fraction of maximum limits before an overload condition is cleared.
     * A good value would probably be between 0.5 and 0.9.
     *
     * @param hysteresisFraction fraction <= 1.0.
     */
    void setOverloadHysteresis(double hysteresisFraction)
    {
        if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0)
        {
            overloadHysteresis_ = hysteresisFraction;
        }
    }

    /**
     * Get the action the server will take on overload.
     *
     * @return a TOverloadAction enum value for the currently set action.
     */
    TOverloadAction getOverloadAction() const
    {
        return overloadAction_;
    }

    /**
     * Set the action the server is to take on overload.
     *
     * @param overloadAction a TOverloadAction enum value for the action.
     */
    void setOverloadAction(TOverloadAction overloadAction)
    {
        overloadAction_ = overloadAction;
    }

    /**
     * Get the time in milliseconds after which a task expires (0 == infinite).
     *
     * @return a 64-bit time in milliseconds.
     */
    int64_t getTaskExpireTime() const
    {
        return taskExpireTime_;
    }

    /**
     * Set the time in milliseconds after which a task expires (0 == infinite).
     *
     * @param taskExpireTime a 64-bit time in milliseconds.
     */
    void setTaskExpireTime(int64_t taskExpireTime)
    {
        taskExpireTime_ = taskExpireTime;
    }

    /**
     * Determine if the server is currently overloaded.
     * This function checks the maximums for open connections and connections
     * currently in processing, and sets an overload condition if they are
     * exceeded.  The overload will persist until both values are below the
     * current hysteresis fraction of their maximums.
     *
     * @return true if an overload condition exists, false if not.
     */
    bool serverOverloaded();

    /** Pop and discard next task on threadpool wait queue.
     *
     * @return true if a task was discarded, false if the wait queue was empty.
     */
    bool drainPendingTask();

    /**
     * Get the starting size of a TConnection object's write buffer.
     *
     * @return # bytes we initialize a TConnection object's write buffer to.
     */
    size_t getWriteBufferDefaultSize() const
    {
        return writeBufferDefaultSize_;
    }

    /**
     * Set the starting size of a TConnection object's write buffer.
     *
     * @param size # bytes we initialize a TConnection object's write buffer to.
     */
    void setWriteBufferDefaultSize(size_t size)
    {
        writeBufferDefaultSize_ = size;
    }

    /**
     * Get the maximum size of read buffer allocated to idle TConnection objects.
     *
     * @return # bytes beyond which we will dealloc idle buffer.
     */
    size_t getIdleReadBufferLimit() const
    {
        return idleReadBufferLimit_;
    }

    /**
     * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
     * Get the maximum size of read buffer allocated to idle TConnection objects.
     *
     * @return # bytes beyond which we will dealloc idle buffer.
     */
    size_t getIdleBufferMemLimit() const
    {
        return idleReadBufferLimit_;
    }

    /**
     * Set the maximum size read buffer allocated to idle TConnection objects.
     * If a TConnection object is found (either on connection close or between
     * calls when resizeBufferEveryN_ is set) with more than this much memory
     * allocated to its read buffer, we free it and allow it to be reinitialized
     * on the next received frame.
     *
     * @param limit of bytes beyond which we will shrink buffers when checked.
     */
    void setIdleReadBufferLimit(size_t limit)
    {
        idleReadBufferLimit_ = limit;
    }

    /**
     * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
     * Set the maximum size read buffer allocated to idle TConnection objects.
     * If a TConnection object is found (either on connection close or between
     * calls when resizeBufferEveryN_ is set) with more than this much memory
     * allocated to its read buffer, we free it and allow it to be reinitialized
     * on the next received frame.
     *
     * @param limit of bytes beyond which we will shrink buffers when checked.
     */
    void setIdleBufferMemLimit(size_t limit)
    {
        idleReadBufferLimit_ = limit;
    }



    /**
     * Get the maximum size of write buffer allocated to idle TConnection objects.
     *
     * @return # bytes beyond which we will reallocate buffers when checked.
     */
    size_t getIdleWriteBufferLimit() const
    {
        return idleWriteBufferLimit_;
    }

    /**
     * Set the maximum size write buffer allocated to idle TConnection objects.
     * If a TConnection object is found (either on connection close or between
     * calls when resizeBufferEveryN_ is set) with more than this much memory
     * allocated to its write buffer, we destroy and construct that buffer with
     * writeBufferDefaultSize_ bytes.
     *
     * @param limit of bytes beyond which we will shrink buffers when idle.
     */
    void setIdleWriteBufferLimit(size_t limit)
    {
        idleWriteBufferLimit_ = limit;
    }

    /**
     * Get # of calls made between buffer size checks.  0 means disabled.
     *
     * @return # of calls between buffer size checks.
     */
    int32_t getResizeBufferEveryN() const
    {
        return resizeBufferEveryN_;
    }

    /**
     * Check buffer sizes every "count" calls.  This allows buffer limits
     * to be enforced for persistant connections with a controllable degree
     * of overhead. 0 disables checks except at connection close.
     *
     * @param count the number of calls between checks, or 0 to disable
     */
    void setResizeBufferEveryN(int32_t count)
    {
        resizeBufferEveryN_ = count;
    }

    /**
     * Main workhorse function, starts up the server listening on a port and
     * loops over the libevent handler.
     */
    void serve();

    /**
     * Causes the server to terminate gracefully (can be called from any thread).
     */
    void stop();

    /// Creates a socket to listen on and binds it to the local port.
    void createAndListenOnSocket();

    /**
     * Takes a socket created by createAndListenOnSocket() and sets various
     * options on it to prepare for use in the server.
     *
     * @param fd descriptor of socket to be initialized/
     */
    void listenSocket(THRIFT_SOCKET fd);

    /**
     * Register the optional user-provided event-base (for single-thread servers)
     *
     * This method should be used when the server is running in a single-thread
     * mode, and the event base is provided by the user (i.e., the caller).
     *
     * @param user_event_base the user-provided event-base. The user is
     * responsible for freeing the event base memory.
     */
    void registerEvents(event_base* user_event_base);

    /**
     * Returns the optional user-provided event-base (for single-thread servers).
     */
    event_base* getUserEventBase() const
    {
        return userEventBase_;
    }

private:
    /**
     * Callback function that the threadmanager calls when a task reaches
     * its expiration time.  It is needed to clean up the expired connection.
     *
     * @param task the runnable associated with the expired task.
     */
    void expireClose(boost::shared_ptr<Runnable> task);

    /**
     * Return an initialized connection object.  Creates or recovers from
     * pool a TConnection and initializes it with the provided socket FD
     * and flags.
     *
     * @param socket FD of socket associated with this connection.
     * @param addr the sockaddr of the client
     * @param addrLen the length of addr
     * @return pointer to initialized TConnection object.
     */
    TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
                                  socklen_t addrLen);

    /**
     * Returns a connection to pool or deletion.  If the connection pool
     * (a stack) isn't full, place the connection object on it, otherwise
     * just delete it.
     *
     * @param connection the TConection being returned.
     */
    void returnConnection(TConnection* connection);
};

class TNonblockingIOThread : public Runnable
{
public:
    // Creates an IO thread and sets up the event base.  The listenSocket should
    // be a valid FD on which listen() has already been called.  If the
    // listenSocket is < 0, accepting will not be done.
    TNonblockingIOThread(TNonblockingServer* server,
                         int number,
                         THRIFT_SOCKET listenSocket,
                         bool useHighPriority);

    ~TNonblockingIOThread();

    // Returns the event-base for this thread.
    event_base* getEventBase() const
    {
        return eventBase_;
    }

    // Returns the server for this thread.
    TNonblockingServer* getServer() const
    {
        return server_;
    }

    // Returns the number of this IO thread.
    int getThreadNumber() const
    {
        return number_;
    }

    // Returns the thread id associated with this object.  This should
    // only be called after the thread has been started.
    Thread::id_t getThreadId() const
    {
        return threadId_;
    }

    // Returns the send-fd for task complete notifications.
    evutil_socket_t getNotificationSendFD() const
    {
        return notificationPipeFDs_[1];
    }

    // Returns the read-fd for task complete notifications.
    evutil_socket_t getNotificationRecvFD() const
    {
        return notificationPipeFDs_[0];
    }

    // Returns the actual thread object associated with this IO thread.
    boost::shared_ptr<Thread> getThread() const
    {
        return thread_;
    }

    // Sets the actual thread object associated with this IO thread.
    void setThread(const boost::shared_ptr<Thread>& t)
    {
        thread_ = t;
    }

    // Used by TConnection objects to indicate processing has finished.
    bool notify(TNonblockingServer::TConnection* conn);

    // Enters the event loop and does not return until a call to stop().
    virtual void run();

    // Exits the event loop as soon as possible.
    void stop();

    // Ensures that the event-loop thread is fully finished and shut down.
    void join();

    /// Registers the events for the notification & listen sockets
    void registerEvents();

private:
    /**
     * C-callable event handler for signaling task completion.  Provides a
     * callback that libevent can understand that will read a connection
     * object's address from a pipe and call connection->transition() for
     * that object.
     *
     * @param fd the descriptor the event occurred on.
     */
    static void notifyHandler(evutil_socket_t fd, short which, void* v);

    /**
     * C-callable event handler for listener events.  Provides a callback
     * that libevent can understand which invokes server->handleEvent().
     *
     * @param fd the descriptor the event occured on.
     * @param which the flags associated with the event.
     * @param v void* callback arg where we placed TNonblockingServer's "this".
     */
    static void listenHandler(evutil_socket_t fd, short which, void* v)
    {
        ((TNonblockingServer*)v)->handleEvent(fd, which);
    }

    /// Exits the loop ASAP in case of shutdown or error.
    void breakLoop(bool error);

    /// Create the pipe used to notify I/O process of task completion.
    void createNotificationPipe();

    /// Unregisters our events for notification and listen sockets.
    void cleanupEvents();

    /// Sets (or clears) high priority scheduling status for the current thread.
    void setCurrentThreadHighPriority(bool value);

private:
    /// associated server
    TNonblockingServer* server_;

    /// thread number (for debugging).
    const int number_;

    /// The actual physical thread id.
    Thread::id_t threadId_;

    /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
    THRIFT_SOCKET listenSocket_;

    /// Sets a high scheduling priority when running
    bool useHighPriority_;

    /// pointer to eventbase to be used for looping
    event_base* eventBase_;

    /// Set to true if this class is responsible for freeing the event base
    /// memory.
    bool ownEventBase_;

    /// Used with eventBase_ for connection events (only in listener thread)
    struct event serverEvent_;

    /// Used with eventBase_ for task completion notification
    struct event notificationEvent_;

/// File descriptors for pipe used for task completion notification.
    evutil_socket_t notificationPipeFDs_[2];

    /// Actual IO Thread
    boost::shared_ptr<Thread> thread_;
};

}
}
} // apache::thrift::server

#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
